feat(records): add list (filter) endpoint with read data classes#2680
feat(records): add list (filter) endpoint with read data classes#2680andersfylling wants to merge 9 commits into
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #2680 +/- ##
==========================================
+ Coverage 93.65% 93.67% +0.01%
==========================================
Files 498 498
Lines 50391 50542 +151
==========================================
+ Hits 47196 47347 +151
Misses 3195 3195
🚀 New features to boost your workflow:
|
712d288 to
eeb284f
Compare
Add RecordsAPI.list, a cursorless POST to /streams/{streamId}/records/filter
returning a RecordList (max 1000 records). Introduces the records read model:
- Record / RecordList read data classes (clean CogniteResource, no RecordId
multiple-inheritance; RecordList carries optional `typing`).
- TimeRange (gte/gt/lte/lt) for last_updated_time and RecordSourceSelector for
source/property selection; reuse the data-modeling Filter DSL, InstanceSort,
and TypeInformation.
- Add a READ op to RecordsConcurrencyOperation + a read semaphore to the
records concurrency config (reads no longer borrow the write semaphore).
- Make _records_url append the path suffix literally (encode only stream_id)
so "/filter" isn't percent-encoded.
Also register the records API module in the docstring-example doctest runner.
target_units is intentionally deferred to a follow-up.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
eeb284f to
4f3a737
Compare
…l pyproject.toml deps Switch list() from direct _post() to the _list() helper (matching the instances API pattern), and remove accidentally added Poetry 1.x dependency sections (including ty as a runtime dep). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…on all endpoints - Pass resource_path to _list() so it doesn't fall back to the missing _RESOURCE_PATH class var on RecordsAPI, fixing test failures - Add module-level _DEFAULT_STREAM_TYPE = "immutable" constant - _get_semaphore now requires stream_type explicitly (no default) - All endpoints (delete, ingest, upsert, list) accept stream_type and pass an explicit semaphore, ready for future HierarchicalBoundedSemaphore - Regenerate sync API Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…limit) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request adds support for filtering and listing records in a stream by introducing the list method to both the async and sync RecordsAPI, along with supporting data classes (Record, RecordList, TimeRange, RecordSourceSelector) and unit tests. Feedback on the changes suggests enhancing type safety by using Literal["immutable", "mutable"] instead of str for the stream_type parameter. Additionally, it is recommended to remove the unused stream_type parameter from _get_semaphore to prevent Liskov Substitution Principle violations and avoid unnecessary type-ignore comments.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
haakonvt
left a comment
There was a problem hiding this comment.
Reviewed the API implementation, will try to cover data classes + tests tomorrow
| _OPERATION_TO_RATE_LIMIT: ClassVar[dict[str, RecordsConcurrencyOperation]] = { | ||
| "read": RecordsConcurrencyOperation.READ, | ||
| "write": RecordsConcurrencyOperation.WRITE, | ||
| "delete": RecordsConcurrencyOperation.WRITE, | ||
| } |
There was a problem hiding this comment.
I still think this is an unnecessary indirection 😄
There was a problem hiding this comment.
I'm cleaning it up here: #2688
the concurrency setup is a bit complicated, so wrote something dumbed down in this PR.
| items: RecordId | Sequence[RecordId], | ||
| *, | ||
| stream_id: str, | ||
| stream_type: StreamType = _DEFAULT_STREAM_TYPE, |
There was a problem hiding this comment.
I don't understand the usage here at all - it seems to only be used by _get_semaphore? 🤔
There was a problem hiding this comment.
it will be related to an upcoming concurrency config: #2688
Right now it's a noop. but all operations are stream type bound.
| identifiers=RecordIdSequence.load(items), | ||
| wrap_ids=True, | ||
| resource_path=self._records_url(stream_id), | ||
| override_semaphore=self._get_semaphore("delete", stream_type), |
There was a problem hiding this comment.
If you can't infer stream_type, you need to collapse this to a single semaphore setting that works in all cases
There was a problem hiding this comment.
I don't think I quite understand, could you elaborate?
| (max 1000). To page over a large time window, issue multiple calls with partitioned | ||
| ``last_updated_time`` ranges. |
There was a problem hiding this comment.
We need to support pagination, it is one of the key features of the SDK 😄
There was a problem hiding this comment.
Pagination is not a concept for this api endpoint, sadly.
There was a problem hiding this comment.
The SDK should still do it to avoid 100 customers doing it in 100 wrong ways xD
There was a problem hiding this comment.
Are you saying we should postponed adding streams and records to the SDK until we can add paging in the backend?
There was a problem hiding this comment.
No 😅 I have not used records - but I just assumed you can/need to paginate by passing ie last timestamp+1 and so on
There was a problem hiding this comment.
I read your docstring now lol. I want the SDK to handle this automatically:
To page over a large time window, issue multiple calls with partitioned…
There was a problem hiding this comment.
yeah, my view is we can do it. But I don't think we should support niche pagination. So unless the API supports it, I don't want to implement this for every SDK.
| filter: Filter | None = None, | ||
| sources: Sequence[RecordSourceSelector] | None = None, | ||
| sort: Sequence[InstanceSort] | InstanceSort | None = None, | ||
| limit: int = 10, |
There was a problem hiding this comment.
We have a default constant that should be used
| filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). | ||
| sources (Sequence[RecordSourceSelector] | None): Which container properties to return. | ||
| sort (Sequence[InstanceSort] | InstanceSort | None): Sort specification(s); up to 5. | ||
| limit (int): Maximum number of records to return (1-1000). Defaults to 10. |
There was a problem hiding this comment.
| limit (int): Maximum number of records to return (1-1000). Defaults to 10. | |
| limit (int): Maximum number of records to return (1-1000). |
| self, | ||
| stream_id: str, | ||
| *, | ||
| stream_type: StreamType = _DEFAULT_STREAM_TYPE, |
There was a problem hiding this comment.
Same argument as above, this will confuse users. If they specify the wrong value here, the list method will still work.
| stream_type: StreamType = _DEFAULT_STREAM_TYPE, |
|
I'll remove the stream_type parameter for now, and make it part of the follow up PR. please ignore these for now |
…ment Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds
list()(filter) to the Records API.POST /streams/{id}/records/filter→RecordList, with support for filtering by time range, DM filter expressions, source selection, sorting, and optional type information.Added stream type as future rate limits will depend on it. For now I've implemented the lowest rate limit, which is related to immutable streams.
https://cognitedata.atlassian.net/browse/HVD-1262